#!/usr/bin/env python3

import argparse
import contextlib
import glob
import os
import shlex
import subprocess
import sys
import tempfile
from typing import TYPE_CHECKING, Iterator, List, Type, cast
from unittest import mock

if TYPE_CHECKING:
    # This script does not have access to the zerver module during runtime.
    # We can only import this when type checking.
    from zerver.lib.test_runner import Runner

TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
os.chdir(os.path.dirname(TOOLS_DIR))
sys.path.insert(0, os.path.dirname(TOOLS_DIR))

# check for the venv
from tools.lib import sanity_check

sanity_check.check_venv(__file__)

import django
import orjson
import responses
from django.conf import settings
from django.test.utils import get_runner

source_files = [
    "analytics/**/*.py",
    "confirmation/**/*.py",
    "corporate/**/*.py",
    "pgroonga/**/*.py",
    "zerver/**/*.py",
    "zilencer/**/*.py",
    "zproject/**/*.py",
]

not_yet_fully_covered = [
    "*/migrations/*.py",
    "*/management/commands/*.py",
    # Analytics fixtures library is used to generate test fixtures;
    # isn't properly accounted for in test coverage analysis since it
    # runs before tests.
    "analytics/lib/fixtures.py",
    # We have 100% coverage on the new stuff; need to refactor old stuff.
    "analytics/views/activity_common.py",
    "analytics/views/installation_activity.py",
    "analytics/views/realm_activity.py",
    "analytics/views/stats.py",
    "analytics/views/support.py",
    # TODO: This is a work in progress and therefore without
    # tests yet.
    "corporate/views/remote_billing_page.py",
    "corporate/lib/remote_billing_util.py",
    # Major lib files should have 100% coverage
    "zerver/lib/addressee.py",
    "zerver/lib/markdown/__init__.py",
    "zerver/lib/cache.py",
    "zerver/lib/cache_helpers.py",
    "zerver/lib/i18n.py",
    "zerver/lib/send_email.py",
    "zerver/lib/url_preview/preview.py",
    "zerver/worker/queue_processors.py",
    # Markdown sub-libs should have full coverage too; a lot are really close
    "zerver/lib/markdown/api_arguments_table_generator.py",
    "zerver/lib/markdown/fenced_code.py",
    "zerver/lib/markdown/help_relative_links.py",
    "zerver/lib/markdown/nested_code_blocks.py",
    # Other lib files that ideally would coverage, but aren't sorted
    "zerver/filters.py",
    "zerver/middleware.py",
    "zerver/lib/bot_lib.py",
    "zerver/lib/camo.py",
    "zerver/lib/debug.py",
    "zerver/lib/export.py",
    "zerver/lib/fix_unreads.py",
    "zerver/lib/import_realm.py",
    "zerver/lib/logging_util.py",
    "zerver/lib/profile.py",
    "zerver/lib/queue.py",
    "zerver/lib/sqlalchemy_utils.py",
    "zerver/lib/storage.py",
    "zerver/lib/zephyr.py",
    "zerver/lib/templates.py",
    # Low priority for coverage
    "zerver/lib/ccache.py",
    "zerver/lib/generate_test_data.py",
    "zerver/lib/server_initialization.py",
    "zerver/lib/test_fixtures.py",
    "zerver/lib/test_runner.py",
    "zerver/lib/test_console_output.py",
    "zerver/openapi/curl_param_value_generators.py",
    "zerver/openapi/javascript_examples.py",
    "zerver/openapi/python_examples.py",
    "zerver/openapi/test_curl_examples.py",
    # Tornado should ideally have full coverage, but we're not there.
    "zerver/tornado/descriptors.py",
    "zerver/tornado/django_api.py",
    "zerver/tornado/event_queue.py",
    "zerver/tornado/exceptions.py",
    "zerver/tornado/handlers.py",
    "zerver/tornado/ioloop_logging.py",
    "zerver/tornado/sharding.py",
    "zerver/tornado/views.py",
    # Data import files; relatively low priority
    "zerver/data_import/sequencer.py",
    "zerver/data_import/slack.py",
    "zerver/data_import/gitter.py",
    "zerver/data_import/import_util.py",
    # Webhook integrations with incomplete coverage
    "zerver/webhooks/greenhouse/view.py",
    "zerver/webhooks/jira/view.py",
    "zerver/webhooks/solano/view.py",
    "zerver/webhooks/teamcity/view.py",
    "zerver/webhooks/travis/view.py",
    "zerver/webhooks/zapier/view.py",
    # This is hard to get test coverage for, and low value to do so
    "zerver/views/sentry.py",
    # Cannot have coverage, as tests run in a transaction
    "zerver/lib/safe_session_cached_db.py",
    "zerver/lib/singleton_bmemcached.py",
    # Only covered when we run migrations tests, which are not
    # guaranteed to exist or be runnable
    "zerver/lib/migrate.py",
    # Branches in Django settings files are hard to test
    "zproject/computed_settings.py",
    # Used to override settings
    "zproject/custom_dev_settings.py",
    "zproject/dev_settings.py",
    "zproject/test_extra_settings.py",
    # Better tested in a full deployment
    "zproject/sentry.py",
    "zproject/wsgi.py",
]

enforce_fully_covered = sorted(
    {path for target in source_files for path in glob.glob(target, recursive=True)}
    - {path for target in not_yet_fully_covered for path in glob.glob(target, recursive=True)}
)

FAILED_TEST_PATH = "var/last_test_failure.json"


def get_failed_tests() -> List[str]:
    try:
        with open(FAILED_TEST_PATH, "rb") as f:
            return orjson.loads(f.read())
    except OSError:
        print("var/last_test_failure.json doesn't exist; running all tests.")
        return []


@contextlib.contextmanager
def block_internet() -> Iterator[None]:
    # Monkey-patching - responses library raises requests.ConnectionError when access to an unregistered URL
    # is attempted. We want to replace that with our own exception, so that it propagates all the way:
    with mock.patch.object(responses, "ConnectionError", new=ZulipInternetBlockedError):
        # We'll run all tests in this context manager. It'll cause an error to be raised (see above comment),
        # if any code attempts to access the internet.
        with responses.RequestsMock():
            yield


class ZulipInternetBlockedError(Exception):
    def __init__(self, original_msg: str) -> None:
        zulip_msg = (
            "Outgoing network requests are not allowed in the Zulip tests. "
            "More details and advice are available here:"
            "https://zulip.readthedocs.io/en/latest/testing/testing.html#internet-access-inside-test-suites"
        )
        msg = f"{zulip_msg}\nResponses library error message: {original_msg}"
        super().__init__(msg)


def main() -> None:
    default_parallel = os.cpu_count()

    # Remove proxy settings for running backend tests
    os.environ.pop("http_proxy", "")
    os.environ.pop("https_proxy", "")

    from tools.lib.test_script import (
        add_provision_check_override_param,
        assert_provisioning_status_ok,
    )
    from zerver.lib.test_fixtures import (
        remove_test_run_directories,
        update_test_databases_if_required,
    )

    os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.test_settings"
    # "-u" uses unbuffered IO, which is important when wrapping it in subprocess
    os.environ["PYTHONUNBUFFERED"] = "y"

    usage = """test-backend [options]
    test-backend # Runs all backend tests
    test-backend zerver.tests.test_markdown # run all tests in a test module
    test-backend zerver/tests/test_markdown.py # run all tests in a test module
    test-backend test_markdown # run all tests in a test module
    test-backend zerver.tests.test_markdown.MarkdownTest # run all tests in a test class
    test-backend MarkdownTest # run all tests in a test class
    test-backend zerver.tests.test_markdown.MarkdownTest.test_inline_youtube # run a single test
    test-backend MarkdownTest.test_inline_youtube # run a single test"""

    parser = argparse.ArgumentParser(
        description=usage, formatter_class=argparse.RawTextHelpFormatter
    )

    parser.add_argument(
        "-x",
        "--stop",
        action="store_true",
        dest="fatal_errors",
        help="Stop running tests after the first failure.",
    )
    parser.add_argument("--coverage", action="store_true", help="Compute test coverage.")
    parser.add_argument(
        "--verbose-coverage", action="store_true", help="Enable verbose print of coverage report."
    )
    parser.add_argument(
        "--xml-report", action="store_true", help="Enable (slow) XML coverage report."
    )
    parser.add_argument(
        "--no-html-report", action="store_true", help="Disable (slow) HTML coverage report."
    )
    parser.add_argument(
        "--no-cov-cleanup", action="store_true", help="Do not clean generated coverage files."
    )

    parser.add_argument(
        "--parallel",
        dest="processes",
        type=int,
        default=None,
        help="Specify the number of processes to run the "
        "tests in. Default is the number of logical CPUs",
    )
    parser.add_argument("--profile", action="store_true", help="Profile test runtime.")
    add_provision_check_override_param(parser)
    parser.add_argument(
        "--no-shallow",
        action="store_true",
        help="Don't allow shallow testing of templates (deprecated)",
    )
    parser.add_argument("--verbose", action="store_true", help="Show detailed output")
    parser.add_argument("--reverse", action="store_true", help="Run tests in reverse order.")
    parser.add_argument(
        "--rerun",
        action="store_true",
        help="Run the tests which failed the last time test-backend was run.  Implies not --stop.",
    )
    parser.add_argument(
        "--include-webhooks",
        action="store_true",
        help="Include webhook tests.  By default, they are skipped for performance.",
    )
    parser.add_argument(
        "--include-transaction-tests",
        action="store_true",
        help="Include transaction tests.  By default, they are skipped for performance.",
    )
    parser.add_argument(
        "--generate-stripe-fixtures",
        action="store_true",
        help="Generate Stripe test fixtures by making requests to Stripe test network",
    )
    parser.add_argument("args", nargs="*")
    parser.add_argument(
        "--ban-console-output",
        action="store_true",
        help="Require stdout and stderr to be clean of unexpected output.",
    )

    options = parser.parse_args()
    if options.ban_console_output:
        os.environ["BAN_CONSOLE_OUTPUT"] = "1"

    args = options.args
    include_webhooks = options.coverage or options.include_webhooks
    include_transaction_tests = options.coverage or options.include_transaction_tests

    if options.processes is not None and options.processes < 1:
        raise argparse.ArgumentTypeError("option processes: Only positive integers are allowed.")

    test_dirs = ["zerver/tests/", "corporate/tests", "analytics/tests"]

    # While running --rerun, we read var/last_test_failure.json to get
    # the list of tests that failed on the last run, and then pretend
    # those tests were passed explicitly.  --rerun implies
    # !fatal_errors, so that we don't end up removing tests from
    # the list that weren't run.
    if options.rerun:
        default_parallel = 1
        options.fatal_errors = False
        failed_tests = get_failed_tests()
        if failed_tests:
            args = failed_tests
    if len(args) > 0:
        # If we passed a specific set of tests, run in serial mode.
        default_parallel = 1

        # to transform forward slashes '/' present in the argument into dots '.'
        for i, suite in enumerate(args):
            args[i] = suite.rstrip("/").replace("/", ".")

        def rewrite_arguments(search_key: str) -> None:
            for test_dir in test_dirs:
                for root, dirs, files_names in os.walk(test_dir, topdown=False):
                    for file_name in files_names:
                        # Check for files starting with alphanumeric characters and ending with '.py'
                        # Ignore backup files if any
                        if not file_name[0].isalnum() or not file_name.endswith(".py"):
                            continue
                        filepath = os.path.join(root, file_name)
                        with open(filepath) as f:
                            for line in f:
                                if search_key not in line:
                                    continue
                                new_suite = filepath.replace(".py", ".") + suite
                                args[i] = new_suite
                                return

        for suite in args:
            if suite[0].isupper() and "test_" in suite:
                classname = suite.rsplit(".", 1)[0]
                rewrite_arguments(classname)
            elif suite[0].isupper():
                rewrite_arguments(f"class {suite}(")

        for i, suite in enumerate(args):
            if suite.startswith("test"):
                for test_dir in test_dirs:
                    for root, dirs, files_names in os.walk(test_dir):
                        for file_name in files_names:
                            if file_name in (suite, f"{suite}.py"):
                                new_suite = os.path.join(root, file_name)
                                args[i] = new_suite
                                break

        for i, suite in enumerate(args):
            args[i] = suite.replace(".py", "")

        # to transform forward slashes '/' introduced by the zerver_test_dir into dots '.'
        # taking care of any forward slashes that might be present
        for i, suite in enumerate(args):
            args[i] = suite.replace("/", ".")

    full_suite = len(args) == 0

    if full_suite:
        suites = [
            "zerver.tests",
            "analytics.tests",
            "corporate.tests",
        ]
    else:
        suites = args

    if full_suite and include_webhooks:
        suites.append("zerver.webhooks")

    if full_suite and include_transaction_tests:
        suites.append("zerver.transaction_tests")

    if options.generate_stripe_fixtures:
        if full_suite:
            suites = [
                "corporate.tests.test_stripe",
            ]
            full_suite = False
        os.environ["GENERATE_STRIPE_FIXTURES"] = "1"

    assert_provisioning_status_ok(options.skip_provision_check)

    if options.coverage:
        import coverage

        cov = coverage.Coverage(
            data_suffix="", config_file="tools/coveragerc", concurrency="multiprocessing"
        )
        # Do not clean .coverage file in continuous integration job so that coverage data can be uploaded.
        if not options.no_cov_cleanup:
            import atexit

            atexit.register(lambda: cov.erase())  # Ensure the data file gets cleaned up at the end.
        cov.start()
    if options.profile:
        import cProfile

        prof = cProfile.Profile()
        prof.enable()

    # This is kind of hacky, but it's the most reliable way
    # to make sure instrumentation decorators know the
    # setting when they run.
    os.environ["TEST_INSTRUMENT_URL_COVERAGE"] = "TRUE"

    # setup() needs to be called after coverage is started to get proper coverage reports of model
    # files, since part of setup is importing the models for all applications in INSTALLED_APPS.
    django.setup()

    update_test_databases_if_required()

    subprocess.check_call(["tools/webpack", "--test"])

    # isinstance check cannot be used with types. This can potentially improved by supporting
    # dynamic resolution of the test runner type with the django-stubs mypy plugin.
    TestRunner = cast("Type[Runner]", get_runner(settings))

    if options.processes:
        parallel = options.processes
    elif options.profile:
        parallel = 1
    else:
        parallel = default_parallel
    if parallel > 1:
        print(f"-- Running tests in parallel mode with {parallel} processes.", flush=True)
    else:
        print("-- Running tests in serial mode.", flush=True)

    with block_internet():
        test_runner = TestRunner(
            failfast=options.fatal_errors,
            verbosity=2,
            parallel=parallel,
            reverse=options.reverse,
            keepdb=True,
        )
        failures = test_runner.run_tests(
            suites,
            failed_tests_path=FAILED_TEST_PATH,
            full_suite=full_suite,
            include_webhooks=include_webhooks,
        )

    templates_not_rendered = test_runner.get_shallow_tested_templates()
    # We only check the templates if all the tests ran and passed
    if not failures and full_suite and templates_not_rendered:
        missed_count = len(templates_not_rendered)
        print(f"\nError: {missed_count} templates have no tests!")
        for template in templates_not_rendered:
            print(f"  {template}")
        print("See zerver/tests/test_templates.py for the exclude list.")
        failures = True

    if options.coverage:
        cov.stop()
        cov.save()
        cov.combine()
        cov.save()
        if options.verbose_coverage:
            print("Printing coverage data")
            cov.report(show_missing=False)
        if options.xml_report:
            print("Writing XML report")
            cov.xml_report(outfile="var/coverage.xml")
            print("XML report saved; see var/coverage.xml")

    if full_suite and not failures and options.coverage:
        # Assert that various files have full coverage
        for path in enforce_fully_covered:
            missing_lines = cov.analysis2(path)[3]
            if len(missing_lines) > 0:
                print(f"ERROR: {path} no longer has complete backend test coverage")
                print(f"  Lines missing coverage: {missing_lines}")
                print()
                failures = True
        if failures:
            print("It looks like your changes lost 100% test coverage in one or more files")
            print("Usually, the right fix for this is to add some tests.")
            print("But also check out the include/exclude lists in tools/test-backend.")
            print("If this line intentionally is not tested, you can use a # nocoverage comment.")
            print("To run this check locally, use `test-backend --coverage`.")
        ok = True
        for path in not_yet_fully_covered:
            try:
                missing_lines = cov.analysis2(path)[3]
                if not missing_lines:
                    print(
                        f"ERROR: {path} has complete backend test coverage but is still in not_yet_fully_covered."
                    )
                    ok = False
            except coverage.misc.NoSource:
                continue
        if ok:
            print("Coverage checks pass!")
        else:
            print()
            print(
                "There are one or more fully covered files that are still in not_yet_fully_covered."
            )
            print("Remove the file(s) from not_yet_fully_covered in `tools/test-backend`.")
            failures = True

    if options.coverage and not options.no_html_report:
        # We do this late, because it can take quite a while.
        print("Writing HTML report")
        cov.html_report(directory="var/coverage", show_contexts=True)
        print("HTML report saved; visit at http://127.0.0.1:9991/coverage/index.html")

    if options.profile:
        prof.disable()
        with tempfile.NamedTemporaryFile(prefix="profile.data.", delete=False) as stats_file:
            prof.dump_stats(stats_file.name)
            print(f"Profile data saved to {stats_file.name}")
            print(f"You can visualize it using e.g. `snakeviz {shlex.quote(stats_file.name)}`")
            print("Note: If you are using vagrant for development environment you will need to do:")
            print("1.) `vagrant ssh -- -L 8080:127.0.0.1:8080`")
            print(f"2.) `snakeviz -s {shlex.quote(stats_file.name)}`")

    # Ideally, we'd check for any leaked test databases here;
    # but that needs some hackery with database names.
    #
    # destroy_leaked_test_databases()

    removed = remove_test_run_directories()
    if removed:
        print(f"Removed {removed} stale test run directories!")

    # We'll have printed whether tests passed or failed above
    sys.exit(bool(failures))


if __name__ == "__main__":
    main()
